-
Notifications
You must be signed in to change notification settings - Fork 0
OOP for project writers #104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
WalkthroughThis refactoring converts writer modules from procedural code with module-level state to a class-based OOP architecture. An abstract Writer base class provides a consistent interface (write, check_health). Handlers now receive a writers registry via dependency injection instead of importing and initializing writers directly. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| operation, | ||
| "location", | ||
| "format", | ||
| format_options, | ||
| additional_info | ||
| ) | ||
| VALUES | ||
| ( | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s | ||
| )""", | ||
| ) | ||
| VALUES | ||
| ( | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s | ||
| )""", | ||
| ( | ||
| message["event_id"], | ||
| job.get("country", ""), | ||
| job["catalog_id"], | ||
| job["status"], | ||
| job["timestamp_start"], | ||
| job["timestamp_end"], | ||
| job.get("message"), | ||
| (json.dumps(job.get("additional_info")) if "additional_info" in job else None), | ||
| message["tenant_id"], | ||
| message["source_app"], | ||
| message["source_app_version"], | ||
| message["environment"], | ||
| message["timestamp_event"], | ||
| message.get("country", ""), | ||
| message["catalog_id"], | ||
| message["operation"], | ||
| message.get("location"), | ||
| message["format"], | ||
| (json.dumps(message.get("format_options")) if "format_options" in message else None), | ||
| (json.dumps(message.get("additional_info")) if "additional_info" in message else None), | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None: | ||
| """Insert a test topic row. | ||
| Args: | ||
| cursor: Database cursor. | ||
| table: Target table name. | ||
| message: Event payload. | ||
| """ | ||
| logger.debug("Sending to Postgres - %s", table) | ||
| cursor.execute( | ||
| f""" | ||
| INSERT INTO {table} | ||
| ( | ||
| event_id, | ||
| tenant_id, | ||
| source_app, | ||
| environment, | ||
| timestamp_event, | ||
| additional_info | ||
| def _postgres_run_write(self, cursor: Any, table_runs: str, table_jobs: str, message: Dict[str, Any]) -> None: | ||
| """ | ||
| Insert a run event row plus related job rows. | ||
| Args: | ||
| cursor: Database cursor. | ||
| table_runs: Runs table name. | ||
| table_jobs: Jobs table name. | ||
| message: Event payload (includes jobs array). | ||
| """ | ||
| logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) |
Check failure
Code scanning / AquaSec
sqlalchemy safe query execution High
| ) | ||
| VALUES | ||
| ( | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s | ||
| )""", | ||
| ( | ||
| message["event_id"], | ||
| message["job_ref"], | ||
| message["tenant_id"], | ||
| message["source_app"], | ||
| message["source_app_version"], | ||
| message["environment"], | ||
| message["timestamp_start"], | ||
| message["timestamp_end"], | ||
| ), | ||
| ) | ||
| VALUES | ||
| ( | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s | ||
| )""", | ||
| ( | ||
| message["event_id"], | ||
| message["tenant_id"], | ||
| message["source_app"], | ||
| message["environment"], | ||
| message["timestamp"], | ||
| (json.dumps(message.get("additional_info")) if "additional_info" in message else None), | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: | ||
| """Dispatch insertion for a topic into the correct Postgres table(s). | ||
| Skips if Postgres not configured or psycopg2 unavailable. Returns success flag and optional error. | ||
| Args: | ||
| topic_name: Incoming topic identifier. | ||
| message: Event payload. | ||
| """ | ||
| try: | ||
| if not POSTGRES.get("database"): | ||
| logger.debug("No Postgres - skipping") | ||
| return True, None | ||
| if psycopg2 is None: # type: ignore | ||
| logger.debug("psycopg2 not available - skipping actual Postgres write") | ||
| return True, None | ||
|
|
||
| log_payload_at_trace(logger, "Postgres", topic_name, message) | ||
|
|
||
| with psycopg2.connect( # type: ignore[attr-defined] | ||
| database=POSTGRES["database"], | ||
| host=POSTGRES["host"], | ||
| user=POSTGRES["user"], | ||
| password=POSTGRES["password"], | ||
| port=POSTGRES["port"], | ||
| ) as connection: # type: ignore[call-arg] | ||
| with connection.cursor() as cursor: # type: ignore | ||
| if topic_name == "public.cps.za.dlchange": | ||
| postgres_edla_write(cursor, "public_cps_za_dlchange", message) | ||
| elif topic_name == "public.cps.za.runs": | ||
| postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) | ||
| elif topic_name == "public.cps.za.test": | ||
| postgres_test_write(cursor, "public_cps_za_test", message) | ||
| else: | ||
| msg = f"unknown topic for postgres {topic_name}" | ||
| logger.error(msg) | ||
| return False, msg | ||
|
|
||
| connection.commit() # type: ignore | ||
| except (RuntimeError, PsycopgError) as e: # narrowed exception set | ||
| err_msg = f"The Postgres writer with failed unknown error: {str(e)}" | ||
| logger.exception(err_msg) | ||
| return False, err_msg | ||
| for job in message["jobs"]: | ||
| cursor.execute( | ||
| f""" | ||
| INSERT INTO {table_jobs} | ||
| ( | ||
| event_id, | ||
| country, | ||
| catalog_id, | ||
| status, | ||
| timestamp_start, | ||
| timestamp_end, |
Check failure
Code scanning / AquaSec
sqlalchemy safe query execution High
| ) | ||
| VALUES | ||
| ( | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s | ||
| )""", | ||
| ( | ||
| message["event_id"], | ||
| job.get("country", ""), | ||
| job["catalog_id"], | ||
| job["status"], | ||
| job["timestamp_start"], | ||
| job["timestamp_end"], | ||
| job.get("message"), | ||
| (json.dumps(job.get("additional_info")) if "additional_info" in job else None), | ||
| ), | ||
| ) | ||
|
|
||
| def _postgres_test_write(self, cursor: Any, table: str, message: Dict[str, Any]) -> None: | ||
| """ | ||
| Insert a test topic row. | ||
| Args: | ||
| cursor: Database cursor. | ||
| table: Target table name. | ||
| message: Event payload. | ||
| """ | ||
| logger.debug("Sending to Postgres - %s", table) | ||
| cursor.execute( |
Check failure
Code scanning / AquaSec
sqlalchemy safe query execution High
| ( | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s, | ||
| %s | ||
| )""", | ||
| ( | ||
| message["event_id"], | ||
| message["tenant_id"], | ||
| message["source_app"], | ||
| message["environment"], | ||
| message["timestamp"], | ||
| (json.dumps(message.get("additional_info")) if "additional_info" in message else None), | ||
| ), | ||
| ) | ||
|
|
||
| return True, None | ||
| def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: | ||
| """ | ||
| Dispatch insertion for a topic into the correct Postgres table(s). | ||
| Args: | ||
| topic_name: Incoming topic identifier. | ||
| message: JSON-serializable payload. | ||
| Returns: | ||
| Tuple of (success: bool, error_message: Optional[str]). | ||
| """ | ||
| try: |
Check failure
Code scanning / AquaSec
sqlalchemy safe query execution High
|
AquaSec has completed a full security repository scan ✅ You can find the analysis results for this PR branch on this overview.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/event_gate_lambda.py`:
- Around line 84-88: WriterPostgres.__init__ performs eager secret fetching via
aws_secrets.get_secret_value when POSTGRES_SECRET_NAME/POSTGRES_SECRET_REGION
are set, which can raise exceptions during Lambda cold-start; refactor
WriterPostgres so its constructor only stores config and env values, and move
any call to aws_secrets.get_secret_value (and parsing of returned secret) into a
lazy initializer invoked from WriterPostgres.write() and
WriterPostgres.check_health(), ensuring those methods guard against repeated
initialization and surface errors there instead of in __init__.
In `@src/writers/writer_postgres.py`:
- Around line 292-295: The error message inside the except block catching
(RuntimeError, PsycopgError) as e is grammatically incorrect; update the err_msg
construction so it reads "The Postgres writer failed with unknown error" (keep
including the exception string via str(e)), then call logger.exception with that
corrected err_msg and return the same False, err_msg tuple; locate the except
block that defines err_msg and uses logger.exception to make this change.
- Around line 53-64: The constructor (__init__) is performing AWS Secrets
Manager calls (boto3.Session().client and get_secret_value) which may raise
exceptions; change it to only read and store the environment variables (e.g.,
secret_name, secret_region) and initialize _db_config to a safe default (e.g.,
{"database": ""}) without making network calls. Add a lazy-initialization step
in write() (or a helper like _load_db_config_if_needed / _ensure_db_config)
that, on first write, uses boto3 to fetch and json.loads the secret into
self._db_config, catching and surfacing BotoCoreError/ClientError appropriately
(and caching the loaded config so subsequent writes don't re-fetch). Ensure
unique symbols referenced: __init__, _db_config, write (or
_load_db_config_if_needed).
In `@src/writers/writer.py`:
- Around line 1-16: The file header in src/writers/writer.py uses "Copyright
2026" which is inconsistent with other files using "Copyright 2025"; update the
copyright year in the header comment from 2026 to 2025 to match the repository
convention (no code changes required).
🧹 Nitpick comments (6)
tests/writers/test_writer_eventbridge.py (1)
57-68: Remove redundant import inside test function.
BotoCoreErroris already imported at line 18. The local import at line 58 is unnecessary.Suggested fix
def test_write_client_error(): - from botocore.exceptions import BotoCoreError - class DummyError(BotoCoreError): passtests/handlers/test_handler_health.py (1)
69-86: Test name doesn't match test behavior.The function
test_get_health_kafka_not_initializedactually tests when all writers are failing, not just Kafka. Consider renaming for clarity.Suggested rename
-def test_get_health_kafka_not_initialized(): - """Health check returns 503 when Kafka writer is not initialized.""" +def test_get_health_all_writers_failing(): + """Health check returns 503 when all writers report failures."""src/writers/writer_kafka.py (2)
53-82: Minor: Simplify redundant condition.Line 60 has a redundant check -
"kafka_bootstrap_server" not in self.configandnot self.config.get("kafka_bootstrap_server")overlap. The.get()check alone is sufficient since it handles both missing key and falsy value.♻️ Suggested simplification
- if "kafka_bootstrap_server" not in self.config or not self.config.get("kafka_bootstrap_server"): + if not self.config.get("kafka_bootstrap_server"): return None
101-167: LGTM - well-structured write logic with retry handling.The
write()method correctly implements:
- Lazy producer initialization
- Graceful skip when not configured
- Error aggregation from produce callback and flush exceptions
- Retry loop with backoff for transient flush failures
- Appropriate logging levels (warning vs error vs exception)
Regarding the static analysis hint on Line 132 (
msgunused in lambda): this is intentional since the Kafka callback signature requires botherrandmsgparameters. Consider using_msgto signal intent:♻️ Optional: Use underscore prefix to indicate unused parameter
- callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), + callback=lambda err, _msg: (errors.append(str(err)) if err is not None else None),src/writers/writer_eventbridge.py (1)
44-49: Constructor correctly avoids throwing operations - aligns with PR objectives.The constructor only stores configuration and initializes internal state. The boto3 client creation is deferred to the first
write()call (lazy initialization), which adheres to the requirement from Issue#94to "avoid any initialization logic in constructors that can throw exceptions."Minor type hint issue:
Optional["boto3.client"]is technically incorrect sinceboto3.clientis a factory function, not a type.💡 Optional: Fix the type hint
- self._client: Optional["boto3.client"] = None + self._client: Optional[Any] = None # boto3 EventBridge clientOr for stricter typing, use
Optional[botocore.client.BaseClient]with an appropriate import.src/writers/writer_postgres.py (1)
78-110: SQL injection risk is mitigated but the pattern is fragile.The static analysis correctly flags the f-string table name interpolation. While the current implementation is safe because table names are hardcoded internally (lines 280-285), this pattern is risky for future maintenance:
- Future developers may not realize the security implications
- The pattern could be copied elsewhere with user-controlled input
The data values are correctly parameterized with
%splaceholders, which is good.💡 Consider using psycopg2's identifier quoting
from psycopg2 import sql # Instead of: cursor.execute(f"INSERT INTO {table} ...") # Use: cursor.execute( sql.SQL("INSERT INTO {} ...").format(sql.Identifier(table)), (...) )Alternatively, define table names as module-level constants with explicit validation.
Also applies to: 140-162, 177-199, 223-241
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
src/event_gate_lambda.pysrc/handlers/handler_health.pysrc/handlers/handler_topic.pysrc/writers/writer.pysrc/writers/writer_eventbridge.pysrc/writers/writer_kafka.pysrc/writers/writer_postgres.pytests/handlers/test_handler_health.pytests/handlers/test_handler_topic.pytests/utils/test_trace_logging.pytests/writers/test_writer_eventbridge.pytests/writers/test_writer_kafka.pytests/writers/test_writer_postgres.py
🧰 Additional context used
🧬 Code graph analysis (11)
tests/utils/test_trace_logging.py (3)
src/writers/writer_eventbridge.py (2)
WriterEventBridge(38-122)write(61-104)src/writers/writer_kafka.py (2)
write(101-167)WriterKafka(42-187)src/writers/writer_postgres.py (2)
write(252-297)WriterPostgres(47-318)
src/writers/writer_eventbridge.py (2)
src/utils/trace_logging.py (1)
log_payload_at_trace(29-48)src/writers/writer.py (3)
Writer(25-59)write(35-47)check_health(50-59)
tests/writers/test_writer_eventbridge.py (1)
src/writers/writer_eventbridge.py (3)
WriterEventBridge(38-122)write(61-104)check_health(106-122)
src/writers/writer.py (3)
src/writers/writer_kafka.py (2)
write(101-167)check_health(169-187)src/writers/writer_eventbridge.py (2)
write(61-104)check_health(106-122)src/writers/writer_postgres.py (2)
write(252-297)check_health(299-318)
src/handlers/handler_health.py (4)
src/writers/writer.py (2)
Writer(25-59)check_health(50-59)src/writers/writer_kafka.py (1)
check_health(169-187)src/writers/writer_eventbridge.py (1)
check_health(106-122)src/writers/writer_postgres.py (1)
check_health(299-318)
tests/writers/test_writer_postgres.py (1)
src/writers/writer_postgres.py (6)
WriterPostgres(47-318)_postgres_edla_write(67-126)_postgres_run_write(128-210)_postgres_test_write(212-250)write(252-297)check_health(299-318)
src/writers/writer_kafka.py (3)
src/utils/trace_logging.py (1)
log_payload_at_trace(29-48)src/writers/writer.py (3)
Writer(25-59)write(35-47)check_health(50-59)tests/writers/test_writer_kafka.py (7)
flush(33-34)flush(49-56)flush(65-67)flush(76-78)produce(28-31)produce(38-40)produce(110-111)
src/event_gate_lambda.py (5)
src/writers/writer_eventbridge.py (1)
WriterEventBridge(38-122)src/writers/writer_kafka.py (1)
WriterKafka(42-187)src/writers/writer_postgres.py (1)
WriterPostgres(47-318)src/handlers/handler_topic.py (2)
HandlerTopic(38-155)load_topic_schemas(56-72)src/handlers/handler_health.py (1)
HandlerHealth(33-75)
src/writers/writer_postgres.py (2)
src/utils/trace_logging.py (1)
log_payload_at_trace(29-48)src/writers/writer.py (3)
Writer(25-59)write(35-47)check_health(50-59)
tests/writers/test_writer_kafka.py (1)
src/writers/writer_kafka.py (3)
WriterKafka(42-187)write(101-167)check_health(169-187)
tests/handlers/test_handler_health.py (1)
src/handlers/handler_health.py (2)
HandlerHealth(33-75)get_health(42-75)
🪛 GitHub Check: AquaSec
src/writers/writer_postgres.py
[failure] 89-138: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
[failure] 151-185: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
[failure] 188-222: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
[failure] 234-262: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
🪛 Ruff (0.14.11)
tests/utils/test_trace_logging.py
48-48: Unused method argument: timeout
(ARG002)
src/writers/writer_eventbridge.py
120-120: Consider moving this statement to an else block
(TRY300)
tests/writers/test_writer_postgres.py
209-209: Unused function argument: reset_env
(ARG001)
216-216: Unused function argument: reset_env
(ARG001)
224-224: Unused function argument: reset_env
(ARG001)
233-233: Unused function argument: reset_env
(ARG001)
243-243: Unused function argument: reset_env
(ARG001)
245-245: Unused method argument: kwargs
(ARG002)
255-255: Unused function argument: reset_env
(ARG001)
270-270: Unused function argument: reset_env
(ARG001)
290-290: Unused function argument: reset_env
(ARG001)
src/writers/writer_kafka.py
132-132: Unused lambda argument: msg
(ARG005)
185-185: Consider moving this statement to an else block
(TRY300)
src/writers/writer_postgres.py
78-110: Possible SQL injection vector through string-based query construction
(S608)
140-162: Possible SQL injection vector through string-based query construction
(S608)
177-199: Possible SQL injection vector through string-based query construction
(S608)
223-241: Possible SQL injection vector through string-based query construction
(S608)
293-293: Use explicit conversion flag
Replace with conversion flag
(RUF010)
tests/writers/test_writer_kafka.py
33-33: Unused method argument: timeout
(ARG002)
49-49: Unused method argument: timeout
(ARG002)
65-65: Unused method argument: timeout
(ARG002)
🔇 Additional comments (41)
tests/writers/test_writer_eventbridge.py (3)
1-21: LGTM! Well-structured test file for the refactored EventBridge writer.The imports and file setup are clean. The tests properly instantiate
WriterEventBridgewith per-instance configuration, aligning with the new OOP architecture.
26-39: Good test coverage for write() happy paths.Tests correctly verify:
- Skipping when
event_bus_arnis empty- Successful write with mocked client
- Proper assertion of
put_eventsbeing called
74-96: Good health check test coverage.The health check tests properly cover:
- Not configured state (empty
event_bus_arn)- Success scenario with mocked
boto3.client- Client error scenario with proper
BotoCoreErrorsubclassThe
fmtattribute on line 91 is correctly added to satisfyBotoCoreError's format string requirement.tests/handlers/test_handler_health.py (4)
23-28: Well-designed helper function for test mocking.The
_create_mock_writerhelper provides a clean, reusable way to create mock writers with specificcheck_healthreturn values, supporting the dependency injection pattern.
33-48: Good test for minimal healthy state.Test correctly validates that health returns 200 when Kafka is healthy and optional writers are "not configured" (which is a healthy state per the Writer interface contract).
89-116: Good coverage for individual writer disabled states.Tests properly verify that disabling EventBridge or Postgres (returning
(True, "not configured")) still results in a healthy overall status.
137-163: Good uptime and integration tests.The uptime test validates the response structure, and the integration test ensures the full lambda handler path works with the mocked writers from conftest.
tests/writers/test_writer_postgres.py (6)
38-89: Thorough tests for Postgres insert helper methods.Tests properly verify:
_postgres_edla_writewith optional and missing optional fields- Correct parameter positioning and JSON serialization
91-154: Good coverage for run and test write helpers.
_postgres_run_writetest validates both the run insert and multiple job inserts with correct parameter handling._postgres_test_writeproperly tests the test table insertion.
159-164: Thereset_envfixture is intentionally used for side-effect cleanup.The static analysis warning about unused
reset_envargument is a false positive. This is a standard pytest pattern where fixtures are included for their setup/teardown effects rather than direct usage. The fixture cleans up environment variables after each test.
206-253: Good write() behavioral path coverage.Tests properly verify:
- Skip when no database configured
- Skip when psycopg2 is missing
- Error on unknown topic
- Success on known topic
- Exception handling returns error
255-307: Good init and topic-specific write tests.Tests verify:
- Secret loading from AWS Secrets Manager
dlchangetopic write successrunstopic write success (validates 2 inserts: run + job)
313-331: Good check_health() coverage.Tests cover:
- Not configured state (empty database)
- Success state (all config present)
- Missing host detection
Consider adding tests for other missing config fields (user, password, port) for completeness, though this is optional since the logic is similar.
src/writers/writer.py (2)
25-48: Well-designed abstract base class.The
WriterABC provides a clean, minimal interface for all writers with:
- Configuration storage in
__init__write()with clear success/failure return semantics- Documented convention for skip scenarios returning
(True, None)This aligns well with the concrete implementations in
WriterKafka,WriterEventBridge, andWriterPostgres.
49-59: Good health check contract documentation.The docstring clearly defines the three possible health states:
(True, "ok")- configured and working(True, "not configured")- not configured, skipped(False, "error message")- configured but failingThis convention is consistently implemented across all concrete writers.
tests/utils/test_trace_logging.py (4)
17-23: Clean module imports for testing.The explicit module imports allow proper access to both the writer classes and their module-level loggers for setting trace level.
25-38: Good EventBridge trace logging test.Test properly:
- Sets trace level on the module's logger
- Creates writer with event_bus_arn
- Mocks the client
- Verifies trace payload appears in logs
41-61: FakeProducer timeout parameter is intentionally unused.The
timeoutparameter onflush(line 48) matches the realProducer.flush(timeout)signature thatWriterKafka._flush_with_timeoutcalls. The static analysis warning is a false positive - the parameter exists for interface compatibility, not for use in this mock.
64-107: Good Postgres trace logging test.Test properly:
- Monkeypatches psycopg2 with dummy implementation
- Sets trace level on module's logger
- Creates writer and sets
_db_config- Verifies trace payload appears in logs
src/handlers/handler_health.py (3)
26-26: LGTM!Clean import of the
Writerabstract base class, aligning with the new OOP architecture.
38-40: LGTM!The constructor now accepts a
writersdictionary via dependency injection, which improves testability and decouplesHandlerHealthfrom specific writer implementations. This is a well-designed change that aligns with the PR objectives.
55-58: LGTM!The generalized health check loop cleanly iterates over all writers and aggregates failures. This replaces hardcoded per-writer checks with a uniform approach, making it easy to add or remove writers without modifying this code.
tests/handlers/test_handler_topic.py (5)
28-34: LGTM!Good test setup with mock writers dictionary. This properly aligns with the new constructor signature that accepts a
writersparameter via dependency injection.
143-146: LGTM!Clean approach to mock all writers uniformly for success scenarios. Iterating over
writers.values()makes the test resilient to adding new writers.
162-166: LGTM!Good pattern for testing single writer failure - explicitly setting each writer's mock return value makes the test intent clear and verifiable.
183-187: LGTM!Proper setup for testing multiple writer failures. The assertion on Line 198 correctly verifies that both failure types are reported.
202-205: LGTM!Consistent mocking pattern for the lowercase bearer header test case.
src/handlers/handler_topic.py (3)
31-31: LGTM!Clean import of the
Writerabstract base class.
43-54: LGTM!Well-structured constructor with dependency injection of the
writersregistry. This design cleanly separates handler logic from writer instantiation, improving testability and aligning with the OOP refactoring goals.
139-142: LGTM!The unified writer iteration loop is a significant improvement. It:
- Treats all writers uniformly via the
Writerinterface- Collects errors with descriptive type/message structure
- Enables easy addition of new writers without code changes here
- Sets the foundation for future async dispatch (per issue
#94)src/event_gate_lambda.py (2)
32-34: LGTM!Explicit imports of concrete writer classes. This makes dependencies clear and avoids wildcard imports.
91-94: LGTM!Clean wiring of the writers registry into both
HandlerTopicandHandlerHealthvia dependency injection. This enables uniform health checks and message dispatch across all writers.src/writers/writer_kafka.py (4)
17-20: LGTM!Clear module docstring describing the Kafka writer's purpose.
42-51: LGTM!Clean class definition with proper inheritance from
Writer. The constructor correctly defers producer creation to lazy initialization, aligning with issue#94's requirement to avoid throwing in constructors.
84-99: LGTM!Good defensive helper method. The
TypeErrorcatch handles stubs or older client versions that don't support the timeout parameter.
169-187: LGTM!The
check_health()method properly implements theWritercontract:
- Returns
(True, "not configured")when bootstrap server is missing- Lazily initializes producer during health check if needed
- Returns
(False, "producer initialization failed")if creation fails- Catches
KafkaExceptionand returns error detailsThe static analysis hint (TRY300) about moving Line 185 to an
elseblock is a minor style preference and doesn't affect correctness.tests/writers/test_writer_kafka.py (2)
24-78: LGTM - Fake producer classes correctly implement the interface.The static analysis warnings about unused
timeoutparameters (lines 33, 49, 65) are false positives. These fake/mock classes intentionally accept but ignore thetimeoutparameter to match the real Kafka Producer interface signature. TheFakeProducerTypeErrorclass (line 76) intentionally omits the parameter to test the TypeError fallback path.
84-183: Test coverage for the new OOP design looks comprehensive.The tests properly exercise:
- Lazy initialization skip when not configured
- Success and error paths for write operations
- Kafka exception handling
- Flush retry logic with various producer behaviors
- Health check scenarios (not configured, success, initialization failure)
The dependency injection pattern using
writer._producer = FakeProducer*()is appropriate for unit testing.src/writers/writer_eventbridge.py (2)
61-104: LGTM - Write method implementation is clean and follows the lazy initialization pattern.The method properly:
- Skips when not configured (returns
True, None)- Lazily initializes the boto3 client on first use
- Logs payload at trace level before publishing
- Handles AWS-specific exceptions with clear error messages
106-122: Health check implementation is correct.The static analysis hint (TRY300) about moving the return statement to an
elseblock is a minor style suggestion. The current structure is functionally correct and readable.src/writers/writer_postgres.py (1)
299-318: Health check only validates config presence, not connectivity.Unlike the Kafka and EventBridge writers which attempt to create clients during
check_health(), the Postgres writer only checks if configuration values are present. This means a health check could return(True, "ok")even if the database is unreachable.This may be intentional to avoid establishing database connections during health checks, but it's inconsistent with the other writers' behavior.
Is this the intended behavior? If actual connectivity testing is desired, consider adding an optional connection test:
# Optional: Test actual connectivity with psycopg2.connect(...) as conn: conn.cursor().execute("SELECT 1")
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| writers = { | ||
| "kafka": WriterKafka(config), | ||
| "eventbridge": WriterEventBridge(config), | ||
| "postgres": WriterPostgres(config), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "writer_postgres.py" -o -name "event_gate_lambda.py" | head -20Repository: AbsaOSS/EventGate
Length of output: 120
🏁 Script executed:
# Check WriterPostgres implementation
if [ -f "src/writer_postgres.py" ]; then
echo "=== writer_postgres.py ==="
wc -l src/writer_postgres.py
head -70 src/writer_postgres.py
elif [ -f "src/writers/writer_postgres.py" ]; then
echo "=== src/writers/writer_postgres.py ==="
wc -l src/writers/writer_postgres.py
head -70 src/writers/writer_postgres.py
fiRepository: AbsaOSS/EventGate
Length of output: 2382
🏁 Script executed:
# Check event_gate_lambda.py
if [ -f "src/event_gate_lambda.py" ]; then
echo "=== event_gate_lambda.py ==="
sed -n '80,95p' src/event_gate_lambda.py
fiRepository: AbsaOSS/EventGate
Length of output: 593
Verify constructor behavior aligns with issue #94 requirements.
The writers registry structure is well-designed, but WriterPostgres.__init__ (lines 50-61 in src/writers/writer_postgres.py) violates issue #94's requirement to "avoid any initialization logic in constructors that can throw exceptions."
When POSTGRES_SECRET_NAME and POSTGRES_SECRET_REGION environment variables are set, the constructor immediately calls aws_secrets.get_secret_value(), which can raise BotoCoreError, ClientError, or KeyError. Since this instantiation occurs during Lambda handler initialization, it will cause the entire handler to fail if Secrets Manager is unavailable, resulting in cold-start failures.
Defer secrets loading to lazy initialization in write() or check_health() to ensure the handler initializes successfully regardless of Secrets Manager availability, fully complying with the stated design goal.
🤖 Prompt for AI Agents
In `@src/event_gate_lambda.py` around lines 84 - 88, WriterPostgres.__init__
performs eager secret fetching via aws_secrets.get_secret_value when
POSTGRES_SECRET_NAME/POSTGRES_SECRET_REGION are set, which can raise exceptions
during Lambda cold-start; refactor WriterPostgres so its constructor only stores
config and env values, and move any call to aws_secrets.get_secret_value (and
parsing of returned secret) into a lazy initializer invoked from
WriterPostgres.write() and WriterPostgres.check_health(), ensuring those methods
guard against repeated initialization and surface errors there instead of in
__init__.
| def __init__(self, config: Dict[str, Any]) -> None: | ||
| super().__init__(config) | ||
| secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") | ||
| secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") | ||
|
|
||
| if secret_name and secret_region: | ||
| aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region) | ||
| postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] | ||
| self._db_config: Dict[str, Any] = json.loads(postgres_secret) | ||
| else: | ||
| self._db_config = {"database": ""} | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructor performs AWS calls that can throw exceptions - violates PR objective.
The constructor calls boto3.Session().client() and get_secret_value() (lines 59-60), which can throw BotoCoreError or ClientError. This directly violates the requirement from Issue #94: "avoid any initialization logic in constructors that can throw exceptions."
The EventBridge writer correctly defers client creation to lazy initialization in write(). The Postgres writer should follow the same pattern for loading secrets.
🔧 Suggested fix: Defer secret loading to first write()
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
- secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
- secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
-
- if secret_name and secret_region:
- aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region)
- postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"]
- self._db_config: Dict[str, Any] = json.loads(postgres_secret)
- else:
- self._db_config = {"database": ""}
-
+ self._db_config: Optional[Dict[str, Any]] = None
logger.debug("Initialized PostgreSQL writer")
+
+ def _load_db_config(self) -> Dict[str, Any]:
+ """Lazily load database configuration from AWS Secrets Manager."""
+ secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
+ secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
+ if secret_name and secret_region:
+ aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region)
+ postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"]
+ return json.loads(postgres_secret)
+ return {"database": ""}Then in write():
+ if self._db_config is None:
+ self._db_config = self._load_db_config()
if not self._db_config.get("database"):🤖 Prompt for AI Agents
In `@src/writers/writer_postgres.py` around lines 53 - 64, The constructor
(__init__) is performing AWS Secrets Manager calls (boto3.Session().client and
get_secret_value) which may raise exceptions; change it to only read and store
the environment variables (e.g., secret_name, secret_region) and initialize
_db_config to a safe default (e.g., {"database": ""}) without making network
calls. Add a lazy-initialization step in write() (or a helper like
_load_db_config_if_needed / _ensure_db_config) that, on first write, uses boto3
to fetch and json.loads the secret into self._db_config, catching and surfacing
BotoCoreError/ClientError appropriately (and caching the loaded config so
subsequent writes don't re-fetch). Ensure unique symbols referenced: __init__,
_db_config, write (or _load_db_config_if_needed).
| except (RuntimeError, PsycopgError) as e: | ||
| err_msg = f"The Postgres writer with failed unknown error: {str(e)}" | ||
| logger.exception(err_msg) | ||
| return False, err_msg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix grammatical error in error message.
The error message "The Postgres writer with failed unknown error" should be "The Postgres writer failed with unknown error".
🔧 Proposed fix
- err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
+ err_msg = f"The Postgres writer failed with unknown error: {e!s}"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except (RuntimeError, PsycopgError) as e: | |
| err_msg = f"The Postgres writer with failed unknown error: {str(e)}" | |
| logger.exception(err_msg) | |
| return False, err_msg | |
| except (RuntimeError, PsycopgError) as e: | |
| err_msg = f"The Postgres writer failed with unknown error: {e!s}" | |
| logger.exception(err_msg) | |
| return False, err_msg |
🧰 Tools
🪛 Ruff (0.14.11)
293-293: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🤖 Prompt for AI Agents
In `@src/writers/writer_postgres.py` around lines 292 - 295, The error message
inside the except block catching (RuntimeError, PsycopgError) as e is
grammatically incorrect; update the err_msg construction so it reads "The
Postgres writer failed with unknown error" (keep including the exception string
via str(e)), then call logger.exception with that corrected err_msg and return
the same False, err_msg tuple; locate the except block that defines err_msg and
uses logger.exception to make this change.
Overview
This pull request refactors the EventGate writer architecture to use an abstract base class and individual writer classes for Kafka, EventBridge, and PostgreSQL. It replaces global state and initialization functions with lazy-initialized, dependency-injected writer objects, improving modularity, testability, and health check logic. This PR should be purely refactoring, no new enhancement should be implemented.
Release Notes
Related
Closes #94
Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.